Overview
fetch_indices_ohlcv.py is a Phase 2.5 data acquisition script that fetches and maintains historical OHLCV (Open, High, Low, Close, Volume) data for major NSE indices. It uses a hybrid incremental approach that merges deep historical data with today’s live snapshot.
Pipeline Position: Phase 2.5 - Runs after stock OHLCV fetch, before market breadth processingCritical Function: Provides index price data required for historical market breadth calculations and dashboard charting
Purpose
This script:
- Fetches deep historical OHLCV data from Dhan’s ScanX API
- Incrementally updates existing CSV files (only fetches missing date ranges)
- Merges today’s live snapshot from
all_indices_list.json
- Outputs individual CSV files for each index
Index list with live market data including today’s OHLCV snapshotRequired fields per index:
Symbol: Index symbol (e.g., “NIFTY”, “NIFTY MIDCAP 150”)
Exchange: Exchange code (e.g., “NSE”)
Segment: Segment code
Instrument: Instrument type
IndexID: Unique index identifier
Ltp: Current close price (used as today’s Close)
Open, High, Low: Today’s OHLC values
Volume: Today’s volume
Existing index OHLCV files (if present, only missing dates are fetched)
Output Files
Individual CSV files for each index with standardized namingFilename pattern: {SAFE_SYMBOL}.csv
- Spaces and special characters replaced with underscores
- Example:
NIFTY_MIDCAP_150.csv, NIFTY.csv
CSV Structure:Date,Open,High,Low,Close,Volume
2023-01-01,18000.00,18050.25,17980.50,18030.75,5000000
2023-01-02,18030.75,18100.00,18010.00,18085.50,5200000
Processing Logic
1. Incremental Sync Detection
Checks existing CSV files and determines date ranges to fetch:
existing_data_cache = {}
for idx in indices:
sym = idx["Symbol"]
safe_sym = get_safe_sym(sym)
output_path = os.path.join(OUTPUT_DIR, f"{safe_sym}.csv")
target_start = global_start_ts
if os.path.exists(output_path):
try:
with open(output_path, "r") as f:
rows = list(csv.DictReader(f))
if rows:
existing_data_cache[safe_sym] = rows
last_row_date = rows[-1]["Date"]
# Schedule chunk for recent days to ensure gap-filling
last_dt = datetime.strptime(last_row_date, "%Y-%m-%d")
target_start = int(last_dt.timestamp()) + 86400
except: pass
2. Chunked Historical Fetch
Fetches data in 120-day chunks using multithreading:
CHUNK_DAYS = 120
MAX_THREADS = 60
global_start_ts = 215634600 # 1976
# Only crawl if there's a gap before today
if target_start < global_end_ts - 86400:
current_end = global_end_ts
while current_end > target_start:
c_start = max(target_start, current_end - (CHUNK_DAYS * 86400))
tasks.append({
"EXCH": idx["Exchange"], "SYM": sym, "SEG": idx["Segment"],
"INST": idx["Instrument"], "SEC_ID": idx["IndexID"],
"EXPCODE": 0, "INTERVAL": "D", "START": c_start, "END": current_end,
"SAFE_SYM": safe_sym
})
current_end = c_start - 86400
3. Parallel API Execution
Uses ThreadPoolExecutor for high-speed fetching:
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
future_to_payload = {executor.submit(fetch_chunk, t): t for t in tasks}
for future in as_completed(future_to_payload):
payload = future_to_payload[future]
rows = future.result()
if rows:
new_data[payload["SAFE_SYM"]].extend(rows)
def fetch_chunk(payload):
try:
r = requests.post("https://openweb-ticks.dhan.co/getDataH",
json=payload, headers=get_headers(), timeout=10)
if r.status_code == 200:
data = r.json().get("data", {})
times = data.get("Time", [])
if not times: return []
rows = []
o, h, l, c, v = data.get("o", []), data.get("h", []), \
data.get("l", []), data.get("c", []), \
data.get("v", [])
for i in range(len(times)):
t = times[i]
dt_str = t if isinstance(t, str) else datetime.fromtimestamp(t).strftime("%Y-%m-%d")
rows.append({
'Date': dt_str,
'Open': o[i],
'High': h[i],
'Low': l[i],
'Close': c[i],
'Volume': v[i]
})
return rows
except:
pass
return []
5. Live Data Merge
Merges today’s snapshot with historical data:
today_str = datetime.now().strftime("%Y-%m-%d")
for idx in indices:
safe_sym = get_safe_sym(idx["Symbol"])
# 1. Start with existing or historic data
base_rows = existing_data_cache.get(safe_sym, [])
fetched_rows = new_data.get(safe_sym, [])
all_rows = base_rows + fetched_rows
# 2. Add TODAY'S snapshot from all_indices_list.json
today_row = {
'Date': today_str,
'Open': idx.get('Open'),
'High': idx.get('High'),
'Low': idx.get('Low'),
'Close': idx.get('Ltp'), # Ltp is Close for the running day
'Volume': idx.get('Volume', 0)
}
# Deduplicate and update
merged = {r['Date']: r for r in all_rows}
merged[today_str] = today_row
final_rows = sorted(merged.values(), key=lambda x: x['Date'])
6. CSV Writing
output_path = os.path.join(OUTPUT_DIR, f"{safe_sym}.csv")
with open(output_path, "w", newline='') as f:
writer = csv.DictWriter(f, fieldnames=['Date', 'Open', 'High', 'Low', 'Close', 'Volume'])
writer.writeheader()
writer.writerows(final_rows)
Configuration
Number of days per API request chunk (optimized for rate limiting)
Maximum concurrent API requests (increased for ultra-fast performance)
OUTPUT_DIR
string
default:"indices_ohlcv_data"
Directory where index CSV files are saved
Supported Indices
The script processes all indices in all_indices_list.json, commonly including:
- NIFTY (Nifty 50)
- NIFTY 500
- NIFTY MIDCAP 150
- NIFTY SMALLCAP 250
- NIFTY MIDSMALLCAP 400
- Sector indices (Bank, IT, Pharma, etc.)
- Thematic indices
Usage Example
python fetch_indices_ohlcv.py
Expected Output:
Checking 147 indices for sync...
Executing 423 API chunks for history...
Merging with Live Snapshots and saving CSVs...
Successfully updated all index CSVs with Today's Live data.
Incremental Updates: The script only fetches date ranges that are missing from existing CSV files, making subsequent runs extremely fast.
Multithreading: With 60 concurrent threads, the script can fetch decades of data for 100+ indices in under 2 minutes.
Error Handling
- API Timeout: Individual chunk failures are silently skipped (returns empty array)
- Malformed CSV: If existing CSV is corrupted, script re-fetches full history
- Missing today’s data: If
Ltp is missing from input JSON, uses 0 as fallback
- Date parsing errors: Handles both string dates and Unix timestamps from API
Data Quality
Today’s Data Source: The “Close” price for today comes from the Ltp field in all_indices_list.json, which updates in real-time during market hours. After market close, this represents the final close price.
Symbol Sanitization
def get_safe_sym(sym):
return "".join([c if c.isalnum() else "_" for c in sym])
Converts symbols to filesystem-safe names:
"NIFTY MIDCAP 150" → "NIFTY_MIDCAP_150"
"NIFTY 50" → "NIFTY_50"
API Endpoint
URL: https://openweb-ticks.dhan.co/getDataH
Method: POST
Payload Example:
{
"EXCH": "NSE",
"SYM": "NIFTY",
"SEG": "IDX",
"INST": "INDEX",
"SEC_ID": 13,
"EXPCODE": 0,
"INTERVAL": "D",
"START": 1640995200,
"END": 1672531200
}
Response Example:
{
"data": {
"Time": [1640995200, 1641081600, ...],
"o": [18000.0, 18050.0, ...],
"h": [18100.0, 18150.0, ...],
"l": [17950.0, 18000.0, ...],
"c": [18030.0, 18080.0, ...],
"v": [5000000, 5200000, ...]
}
}